package middlewareconf

import (
	"context"
	"errors"
	"fmt"
	"strconv"
	"strings"
	"time"

	"gitee.com/xfrm/middleware/xlog"
)

// MQConfig...
type MQConfig struct {
	MQType         MiddlewareType
	MQAddr         []string
	Topic          string
	TimeOut        time.Duration
	CommitInterval time.Duration
	Offset         int64
	OffsetAt       string
	BatchSize      int
	Async          bool
	// time interval to flush msg to broker default is 1 second
	BatchTimeout time.Duration
	// the maximum number of bytes permitted in a batch(for pulsar)
	BatchKBSize uint
	// subscriptionType(for pulsar)
	SubscriptionType int
	MaxBytes         int

	TTR   uint32 // time to run
	TTL   uint32 // time to live
	Tries uint16 // delay tries

	RequestInterval time.Duration // client request interval

	SendTimeout time.Duration
}

// MQKeyParts mq sdk config key parts
type MQKeyParts struct {
	Topic string
	Group string
}

func (p *MiddleConf) GetMQConfig(ctx context.Context, topic string, mqType MiddlewareType) (*MQConfig, error) {
	fun := "MiddleConf.GetConfig-->"
	xlog.Infof(ctx, "%s get mq config topic:%s", fun, topic)

	brokersVal, ok := p.getConfigStringItemWithFallback(ctx, topic, apolloMQBrokersKey, mqType)
	if !ok {
		return nil, fmt.Errorf("%s no brokers config found", fun)
	}

	var brokers []string
	for _, broker := range strings.Split(brokersVal, ApolloMQBrokersSep) {
		if broker != "" {
			brokers = append(brokers, strings.TrimSpace(broker))
		}
	}

	xlog.Infof(ctx, "%s got config brokers:%s", fun, brokers)

	offsetAtVal, ok := p.getConfigStringItemWithFallback(ctx, topic, apolloMQOffsetAtKey, mqType)
	if !ok {
		xlog.Infof(ctx, "%s no offsetAtVal config founds", fun)
	}
	xlog.Infof(ctx, "%s got config offsetAt:%s", fun, offsetAtVal)

	ttr, ok := p.getConfigIntItemWithFallback(ctx, topic, apolloMQTTRKey, mqType)
	if !ok {
		xlog.Infof(ctx, "%s get config ttr no found, use default: %v", fun, defaultMQTTR)
		ttr = defaultMQTTR
	}
	xlog.Infof(ctx, "%s got config TTR:%d", fun, ttr)

	ttl, ok := p.getConfigIntItemWithFallback(ctx, topic, apolloMQTTLKey, mqType)
	if !ok {
		xlog.Infof(ctx, "%s get config ttl no found, use default: %v", fun, defaultMQTTL)
		ttl = defaultMQTTL
	}
	xlog.Infof(ctx, "%s got config TTL:%d", fun, ttl)

	tries, ok := p.getConfigIntItemWithFallback(ctx, topic, apolloMQTriesKey, mqType)
	if !ok {
		xlog.Infof(ctx, "%s get config tries no found, use default: %v", fun, defaultMQTries)
		tries = defaultMQTries
	}
	xlog.Infof(ctx, "%s got config tries: %d", fun, tries)

	batchSize, ok := p.getConfigIntItemWithFallback(ctx, topic, apolloMQBatchSizeKey, mqType)
	if !ok {
		xlog.Infof(ctx, "%s get config batchSize no found, use default: %v", fun, defaultMQBatchSize)
		batchSize = defaultMQBatchSize
	}
	xlog.Infof(ctx, "%s got config batchSize: %d", fun, batchSize)

	batchTimeoutMs, ok := p.getConfigIntItemWithFallback(ctx, topic, apolloMQBatchTimeoutMsKey, mqType)
	if !ok {
		xlog.Infof(ctx, "%s get config batchTimeoutMs no found, use default: %v", fun, defaultMQBatchTimeoutMs)
		batchTimeoutMs = defaultMQBatchTimeoutMs
	}
	xlog.Infof(ctx, "%s got config batchTimeout:%d", fun, batchTimeoutMs)

	timeoutMs, ok := p.getConfigIntItemWithFallback(ctx, topic, apolloTimeoutMsKey, mqType)
	if !ok {
		xlog.Debugf(ctx, "%s get config writeTimeoutMs not found, use default: %v", fun, defaultMQTimeout)
		timeoutMs = defaultMQTimeout
	}
	xlog.Infof(ctx, "%s got config timeoutms: %d", fun, timeoutMs)

	requestIntervalMs, ok := p.getConfigStringItemWithFallback(ctx, topic, apolloMQRequestIntervalMS, mqType)
	if !ok {
		xlog.Infof(ctx, "%s no requestSleep config founds", fun)
	}
	requestIntervalMsVal, err := strconv.ParseUint(requestIntervalMs, 10, 32)
	if err != nil {
		requestIntervalMsVal = defaultMQInterval
	}
	xlog.Infof(ctx, "%s got config requestSleepMs:%d", fun, requestIntervalMsVal)

	async, ok := p.getConfigBoolItemWithFallback(ctx, topic, apolloMQAsyncKey, mqType)
	if !ok {
		xlog.Infof(ctx, "%s no async config founds", fun)
	}
	xlog.Infof(ctx, "%s got config async:%v", fun, async)

	maxBytes, ok := p.getConfigIntItemWithFallback(ctx, topic, apolloMQMaxBytesKey, mqType)
	if !ok || maxBytes <= 1 {
		xlog.Infof(ctx, "%s no maxbytes config founds", fun)
		maxBytes = defaultMQMaxBytes
	}
	xlog.Infof(ctx, "%s got config maxbytes:%v", fun, maxBytes)

	batchKBSize, ok := p.getConfigIntItemWithFallback(ctx, topic, apolloMQBatchKBSizeKey, mqType)
	if !ok {
		xlog.Infof(ctx, "%s get config batchKBSize no found, use default: %v", fun, defaultMQBatchKBSize)
		batchKBSize = defaultMQBatchKBSize
	}
	xlog.Infof(ctx, "%s got config batchKBSize:%d", fun, batchKBSize)

	subscriptionType, ok := p.getConfigIntItemWithFallback(ctx, topic, apolloMQSubscribeTypeKey, mqType)
	if !ok {
		xlog.Infof(ctx, "%s get config subscriptionType no found, use default: %v", fun, defaultMQSubscribeType)
		subscriptionType = defaultMQSubscribeType
	}
	xlog.Infof(ctx, "%s got config subscriptionType:%d", fun, subscriptionType)

	return &MQConfig{
		MQType:           mqType,
		MQAddr:           brokers,
		Topic:            topic,
		TimeOut:          time.Duration(timeoutMs) * time.Millisecond,
		CommitInterval:   1 * time.Second,
		Offset:           FirstOffset,
		OffsetAt:         offsetAtVal,
		TTR:              uint32(ttr),
		TTL:              uint32(ttl),
		Tries:            uint16(tries),
		BatchTimeout:     time.Duration(batchTimeoutMs) * time.Millisecond,
		BatchSize:        batchSize,
		BatchKBSize:      uint(batchKBSize),
		RequestInterval:  time.Duration(requestIntervalMsVal) * time.Millisecond,
		Async:            async,
		SubscriptionType: subscriptionType,
		MaxBytes:         maxBytes,
		SendTimeout:      time.Duration(defaultMQSendTimeoutMS) * time.Millisecond,
	}, nil
}

func (p *MiddleConf) ParseMQKey(ctx context.Context, key string) (*MQKeyParts, error) {
	fun := "MiddleConf.ParseMQKey-->"
	parts := strings.Split(key, apolloConfigSep)
	numParts := len(parts)
	if numParts < 4 {
		errMsg := fmt.Sprintf("%s invalid key:%s", fun, key)
		xlog.Error(ctx, errMsg)
		return nil, errors.New(errMsg)
	}

	return &MQKeyParts{
		Topic: strings.Join(parts[:numParts-3], apolloConfigSep),
		Group: parts[numParts-3],
	}, nil
}
